# coding:utf

import os
import re

from pyspark import *
from operator import *
# from import 必须从包或.py文件开始，不能从文件夹开始
from pyspark.sql import *

from package.defs_4 import *

if __name__ == '__main__':
    conf=SparkConf().setAppName("9_rdd_accumulator")\
    .setMaster("local[*]")
    sc=SparkContext(conf=conf)

    # localhost_path='file://'+os.path.dirname(os.path.dirname(os.getcwd()))+'/data/input/SogouQ.txt'
    localhost_path="hdfs://hadoop3cluster/updown/input/accumulator_broadcast_data.txt"
    # todo 1 正常的单词进行单词计数
    # todo 2 特殊字符统计出现多少个

    rdd1=sc.textFile(localhost_path)
    abnormal_char = [",", ".", "!", "#", "$", "%"]
    broadcastList=sc.broadcast(abnormal_char)
    count=sc.accumulator(0)
    # 数据处理, 先处理数据的空行, 在Python中有内容就是True None就是False
    rdd2=rdd1.filter(lambda t:t.strip())\
        .map(lambda line:line.strip())
    rdd3=rdd2.flatMap(lambda line:re.split('\s+',line))
    def fileterFunc(data) -> bool :
        list1=broadcastList.value
        global count
        for x in list1:
            if x==data:
                count +=1
                return False
        return True
    rdd4=rdd3.filter(fileterFunc)
    rdd5=rdd4.map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
    result =rdd5.collect()
    print(f"需求一计算结果：{result}")
    print("需求二计算结果：{0}".format(count))

































